package com.supply.hsa.link.task.fetch.hotel.analyze;

import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;

import com.supply.hsa.link.common.constant.SupplierClass;
import com.supply.hsa.link.task.fetch.CommonConstant;
import com.supply.hsa.link.task.fetch.TaskExecutor;
import com.supply.hsa.link.task.fetch.TaskManager;
import com.supply.hsa.link.task.fetch.exception.TaskException;
import com.supply.hsa.link.task.fetch.hotel.container.SupplyHotelProcessorContainer;
import com.supply.hsa.link.task.fetch.hotel.container.TaskHandlerContainer;
import com.supply.hsa.link.task.fetch.hotel.dto.CommonAnalyzeRequestDTO;
import com.supply.hsa.link.task.fetch.hotel.dto.CommonSupplyFetchResultWrapper;
import com.supply.hsa.link.task.fetch.WorkerStatus;
import com.supply.hsa.link.task.fetch.hotel.dto.PricePlanMiddleDto;
import com.supply.hsa.link.task.fetch.process.SupplyHotelProcessor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Service;

@SuppressWarnings("rawtypes")
@Service
@Slf4j
public class AnalyzeHotelTaskExecutor implements TaskExecutor<AnalyzeHotelTask>, ApplicationListener<ContextRefreshedEvent> {
	@Autowired
	private SupplyHotelProcessorContainer supplyHotelProcessorContainer;

	@Autowired
	private TaskHandlerContainer taskHandlerContainer;
	
	//FIFO 解析数据并入库
	private Map<SupplierClass, LinkedBlockingQueue<AnalyzeHotelTask>> taskQueue;
	
	//消费者线程数量
	private int consumerThreadNum = 4;
	
	@Override
	public void onApplicationEvent(ContextRefreshedEvent event) {
		if(event.getApplicationContext().getDisplayName().equals(CommonConstant.SPRING_APPLICATION_CONTEXT_NAME)){
			Set<SupplierClass> supplierClasseSet = this.taskHandlerContainer.getRegisteredSupplier();
			if(CollectionUtils.isNotEmpty(supplierClasseSet)){
				taskQueue = new HashMap<SupplierClass, LinkedBlockingQueue<AnalyzeHotelTask>>(supplierClasseSet.size());
				
				for(SupplierClass supplierClass : supplierClasseSet){
					taskQueue.put(supplierClass, new LinkedBlockingQueue<AnalyzeHotelTask>());
					
					//多线程消费抓取的数据；
					//如果抓取数据较慢，消费线程可能会阻塞
					for(int i=1; i<=consumerThreadNum; i++){
						AnalyzeTaskConsumer consumer = new AnalyzeTaskConsumer(supplierClass);
						consumer.setName(supplierClass.getSupplierClass()+"-AnalyzeTaskConsumer-"+i);
						consumer.setUncaughtExceptionHandler(new UncaughtExceptionHandler(){
							@Override
							public void uncaughtException(Thread t, Throwable e) {

								log.error("线程堵塞{}",t, e);
							}
						});
						consumer.start();
					}
				}
			}
		}
	}
	
	private class AnalyzeTaskConsumer extends Thread{
		private SupplierClass supplierClass;
		AnalyzeTaskConsumer(SupplierClass supplierClass){
			this.supplierClass = supplierClass;
		}
		@Override
		public void run() {
			while (true) {
				try {
					//移除并获取队列头部任务
					AnalyzeHotelTask fetchRemoteTask = taskQueue.get(supplierClass).take();
					execute(fetchRemoteTask);
				} catch (InterruptedException e) {
					log.error("移除并获取队列头部任务异常，供应商：{}",supplierClass.getSupplierClass(),e);
				}
			}
		}
	}
	
	private boolean validTask(AnalyzeHotelTask task){
		if(task == null 
				|| task.getSupplierClass() == null
				|| task.getFetchRequest() == null
				|| task.getFetchResponse() == null){
			return false;
		}
		
		return true;
	}
	
	@Override
	public void add(AnalyzeHotelTask task) {
		if(!this.validTask(task)){
			return;
		}
		
		try {
			//往队列尾部添加任务
			taskQueue.get(task.getSupplierClass()).put(task);
			
			Thread.sleep(200);
		} catch (InterruptedException e) {
			log.error("添加任务失败，供应商：{}",task.getSupplierClass(),e);
		}
	}

	@SuppressWarnings("unchecked")
	@Override
	public void execute(AnalyzeHotelTask task) {
		if(!this.validTask(task)){
			return;
		}
		
		SupplyHotelProcessor<PricePlanMiddleDto> supplyHotelInfoProcessor = this.supplyHotelProcessorContainer.getHotelInfoProcessor(task.getSupplierClass());
    	if(supplyHotelInfoProcessor == null){
    		throw new TaskException(task.getSupplierClass() + "未注册SupplyHotelProcessor,不能执行数据解析任务");
    	}
		
		//初始化任务
		task.initTask();
		//任务类型
		task.setTaskName(CommonConstant.TaskType.ANALYZE_HOTEL_TASK);
		//注册任务
		TaskManager.getInstance().register(task);
		
		CommonAnalyzeRequestDTO analyzeRequest = new CommonAnalyzeRequestDTO();
		analyzeRequest.setTaskType(task.getTaskType());
		analyzeRequest.setSupplyClass(task.getSupplierClass().getSupplierClass());
		analyzeRequest.setSupplyCode(task.getFetchRequest().getSupplyCode());
		analyzeRequest.setCheckInDate(task.getFetchRequest().getBeginDate());
		analyzeRequest.setCheckOutDate(task.getFetchRequest().getEndDate());
		analyzeRequest.setWithoutJson(true);//不用JSON处理请求、响应值
		analyzeRequest.setRequest(task.getFetchRequest());
		
		List<Object> resultObjectList = new ArrayList<Object>(task.getFetchResponse().getResultList().size());
		for (CommonSupplyFetchResultWrapper wrapper : task.getFetchResponse().getResultList()) {
			if(CollectionUtils.isNotEmpty(wrapper.getResultList())){
				resultObjectList.addAll(wrapper.getResultList());
			}
		}
		analyzeRequest.setResponseList(resultObjectList);
		
		task.setAnalyzeRequest(analyzeRequest);
		
		try {
	        //转换数据
	        supplyHotelInfoProcessor.convertSupplyHotelInfo(task);
	        
	        String[] hotelIdList = null; 
	        if(StringUtils.isNotBlank(task.getFetchRequest().getHotelIdSplitChar())){
	        	hotelIdList = StringUtils.split(task.getFetchRequest().getHotelId(), task.getFetchRequest().getHotelIdSplitChar());
	        }else{
	        	hotelIdList = new String[]{task.getFetchRequest().getHotelId()};
	        }
	        
	        if(task.getPricePlanMiddleListMap() == null){
	        	task.setPricePlanMiddleListMap(new HashMap<String, List>(hotelIdList.length));
	        }
	        
	        for(String hotelId : hotelIdList){
	        	if(task.getPricePlanMiddleListMap().get(hotelId) == null){
	        		task.getPricePlanMiddleListMap().put(hotelId, new ArrayList(1));
	        	}
	        }
	        
	        long pricePlanMiddleCount = 0;
        	Map<String, Integer> dataSizeMap = new HashMap<String, Integer>(task.getPricePlanMiddleListMap().size());
        	for(Map.Entry<String, List> entry : ((Map<String, List>)task.getPricePlanMiddleListMap()).entrySet()){
        		int valueCount = entry.getValue()!=null ? entry.getValue().size() : 0;
        		dataSizeMap.put(entry.getKey(), valueCount);
        		pricePlanMiddleCount += valueCount;
        	}
        	
        	//解析的中间价格计划数据
        	task.getTaskResult().setPricePlanMiddle(dataSizeMap);
        	task.getTaskResult().setPricePlanMiddleCount(pricePlanMiddleCount);
        	
        	//数据入库
        	supplyHotelInfoProcessor.processSupplyHotelInfo(task);
        	
        	long syncProductDbCount = 0;
        	if(MapUtils.isNotEmpty(task.getTaskResult().getSyncProductDB())){
        		for(Integer cInteger : task.getTaskResult().getSyncProductDB().values()){
        			syncProductDbCount += cInteger;
        		}
        		task.getTaskResult().setSyncProductDbCount(syncProductDbCount);
        	}
	        
	        task.setResult(true);
		} catch (Exception e) {
			log.error("解析数据失败，供应商：{}",task.getSupplierClass(),e);
			
			task.setFailReason(e.getMessage());
			task.setResult(false);
		}
        
        this.finish(task);
	}
	
	@Override
	public void stop(AnalyzeHotelTask task) {
	}
	
	@Override
	public void pause(AnalyzeHotelTask task) {
	}

	@Override
	public void resume(AnalyzeHotelTask task) {
	}

	@Override
	public void finish(AnalyzeHotelTask task) {
		if(!this.validTask(task)){
			return;
		}
		
		task.setEndDate(new Date());
		task.setWorkStatus(WorkerStatus.FINISH);
		
		TaskManager taskManager = TaskManager.getInstance();
		//父任务计数
//		taskManager.increment(taskManager.getTask(task.getParentTaskId()), task);
		
		taskManager.unregister(task);
	}

}
